package com.isyscore.os.metadata.kettle.step;

import com.isyscore.os.metadata.enums.KettleDataFlowNodeType;
import com.isyscore.os.metadata.kettle.base.AbstractStep;
import com.isyscore.os.metadata.kettle.base.FlowConfig;
import com.isyscore.os.metadata.kettle.base.FlowHup;
import com.isyscore.os.metadata.kettle.base.Step;
import com.isyscore.os.metadata.kettle.vis.*;
import com.isyscore.os.metadata.utils.StringEscapeHelper;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.step.errorhandling.StreamInterface;
import org.pentaho.di.trans.steps.mergejoin.MergeJoinMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class MergeJoinStep extends AbstractStep {

    @Override
    public StepMeta decode(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        MergeJoinNode node = (MergeJoinNode) step;
        PluginRegistry registry = PluginRegistry.getInstance();
        PluginInterface sp = registry.findPluginWithId(StepPluginType.class, KettleDataFlowNodeType.MergeJoin.name());
        StepMetaInterface stepMetaInterface = (StepMetaInterface) registry.loadClass(sp);

        MergeJoinMeta mergeJoinMeta = (MergeJoinMeta) stepMetaInterface;

        List<StreamInterface> infoStreams = mergeJoinMeta.getStepIOMeta().getInfoStreams();
        StreamInterface referenceStream = infoStreams.get(0);
        StreamInterface compareStream = infoStreams.get(1);


        StepMeta one = new StepMeta();
        one.setName(node.getStepOne());
        referenceStream.setStepMeta(one);

        StepMeta two = new StepMeta();
        two.setName(node.getStepTwo());
        compareStream.setStepMeta(two);

        mergeJoinMeta.setJoinType(node.getJoinType().getSymbol());

        String[] keyFields1 = new String[node.getConnectFields().size()];
        String[] keyFields2 = new String[node.getConnectFields().size()];

        for (int i = 0; i < node.getConnectFields().size(); i++) {
            FieldTuple fieldTuple = node.getConnectFields().get(i);
            keyFields1[i] = fieldTuple.getFromField();
            keyFields2[i] = fieldTuple.getToField();
        }
        mergeJoinMeta.setKeyFields1(keyFields1);

        mergeJoinMeta.setKeyFields2(keyFields2);


        StepMeta stepMeta = new StepMeta(KettleDataFlowNodeType.MergeJoin.name(), node.getNodeName(), stepMetaInterface);
        stepMeta.setDraw(true);
//        transMeta.addStep(stepMeta);
        return stepMeta;
    }

    @Override
    public StepMeta after(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {

        MergeJoinNode mergeJoinNode = (MergeJoinNode) step;
        List<FlowHup> transHups = transGraph.getTransHups();
        List<FlowHup> newTransHups = new ArrayList<>();

        List<String> nodes = new ArrayList<>();


        String orderStepNodeOne = mergeJoinNode.getStepOne() + "-默认排序";
        newTransHups.add(new FlowHup(orderStepNodeOne, step.getNodeName()));
        newTransHups.add(new FlowHup(mergeJoinNode.getStepOne(), orderStepNodeOne));
        nodes.add(orderStepNodeOne);

        String orderStepNodeTwo = mergeJoinNode.getStepTwo() + "-默认排序";
        newTransHups.add(new FlowHup(orderStepNodeTwo, step.getNodeName()));
        newTransHups.add(new FlowHup(mergeJoinNode.getStepTwo(), orderStepNodeTwo));
        nodes.add(orderStepNodeTwo);

        for (FlowHup transHup : transHups) {
            if (!transHup.getTo().equals(step.getNodeName())) {
                newTransHups.add(transHup);
            }
        }


        mergeJoinNode.setStepOne(orderStepNodeOne);
        mergeJoinNode.setStepTwo(orderStepNodeTwo);
        StepMeta mjStep = decode(mergeJoinNode, databases, transMeta, transGraph);
        updateStep(transMeta, mergeJoinNode.getNodeName(),mjStep);


        String oneNode = nodes.get(0);
        SortRowsStep sortRowsStepOne = new SortRowsStep();
        SortRowsNode visNodeOne = new SortRowsNode();
        visNodeOne.setNodeName(oneNode);
        List<SortField> oneF = mergeJoinNode.getConnectFields().stream().map(f -> new SortField(f.getFromField(), true)).collect(Collectors.toList());
        visNodeOne.setSortFields(oneF);
        StepMeta sortMetaOne = sortRowsStepOne.decode(visNodeOne, databases, transMeta, transGraph);
        sortMetaOne.setParentTransMeta(transMeta);
        transMeta.getSteps().add(sortMetaOne);


        String twoNode = nodes.get(1);
        SortRowsStep sortRowsStepTwo = new SortRowsStep();
        SortRowsNode visNodeTwo = new SortRowsNode();
        visNodeTwo.setNodeName(twoNode);
        List<SortField> twoF = mergeJoinNode.getConnectFields().stream().map(f -> new SortField(f.getToField(), true)).collect(Collectors.toList());
        visNodeTwo.setSortFields(twoF);
        StepMeta sortMetaTwo = sortRowsStepTwo.decode(visNodeTwo, databases, transMeta, transGraph);

        transMeta.getSteps().add(sortMetaTwo);
        List<TransHopMeta> hopMetas = newTransHups.stream().map(h -> new TransHopMeta(getStep(transMeta, h.getFrom()), getStep(transMeta, h.getTo()))).collect(Collectors.toList());
        sortMetaTwo.setParentTransMeta(transMeta);
        transMeta.setTransHops(hopMetas);

        transGraph.setTransHups(hopMetas.stream().map(h -> new FlowHup(h.getFromStep().getName(), h.getToStep().getName())).collect(Collectors.toList()));
        return null;
    }
}
